MQTTに配信したメッセージをfluentdでS3に保存する
AWS IoT が使えるなら、Device Gateway の MQTT ブローカーに配信したメッセージは Rules 定義により簡単に S3 に保存できます。 今回は、AWS IoT が使えない制約があり、代わりに EC2 上で MQTT ブローカーの Mosquitto を動かしているようなケースにおいて、MQTT に配信したメッセージを fluentd を使って S3 に保存する手順を紹介します。
今回の手順では Mosquitto 固有の機能・設定は使っていないため、Mosquitto 以外の MQTT ブローカーでも同じ手順で動作するかと思います。
事前準備
EC2 上で Mosquitto をインストールしておきます。
同じインスタンスに td-agent をインストールし、S3 にオブジェクトを PUT します。 AWS クレデンシャルと権限管理をするために、EC2 インスタンスには S3 の Put 系更新権限がついたIAM Role 付きで 起動ししておきます。
fluentd エージェントのインストール
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh $ td-agent --version td-agent 0.12.12
プラグインのインストール
fluentd のプラグインに関して
- input には fluent-plugin-mqtt
- output には flutent-plugin-s3
を使います。
後者は標準でインストールされているため、前者をインストールします。
$ sudo td-agent-gem install fluent-plugin-mqtt WARN: Unresolved specs during Gem::Specification.reset: json (>= 1.4.3) WARN: Clearing out unresolved specs. Please report a bug if this causes problems. Fetching: mqtt-0.3.1.gem (100%) Successfully installed mqtt-0.3.1 Fetching: fluent-plugin-mqtt-0.0.4.gem (100%) Successfully installed fluent-plugin-mqtt-0.0.4 Parsing documentation for fluent-plugin-mqtt-0.0.4 Installing ri documentation for fluent-plugin-mqtt-0.0.4 Parsing documentation for mqtt-0.3.1 Installing ri documentation for mqtt-0.3.1 Done installing documentation for fluent-plugin-mqtt, mqtt after 0 seconds 2 gems installed
fluentd エージェントの設定変更
fluentd エージェントの設定ファイルは /etc/td-agent/td-agent.conf
にあります。
インプットの定義
今回はトピックが "foo/" で始まるメッセージを転送の対象とします。
<source> type mqtt port 1883 topic foo/# </source>
- type をプラグイン名 mqtt にします。
- topic でトピックを "foo/#" で絞ります。
アウトプットの定義
<match foo.bar> type s3 s3_bucket YOUR_BUCKET_NAME path mqtt/foo/bar s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension} buffer_path /var/log/td-agent/s3_foo.bar time_slice_format %Y%m%d%H%M time_slice_wait 10m utc store_as text </match> <match foo.baz> type s3 s3_bucket YOUR_BUCKET_NAME path mqtt/foo/baz s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension} buffer_path /var/log/td-agent/s3_foo.baz time_slice_format %Y%m%d%H%M time_slice_wait 10m utc store_as text </match>
- MQTT のトピック
foo/bar
が fluentd のタグではfoo.bar
とスラッシュがドットに変換されます。 そのため、トピックfoo/bar
に対するマッチ条件はmatch foo.bar
と書きます。 type
をプラグイン名 s3 にします。- S3 にはgzip圧縮せず、AS IS のまま保存するため
store_as text
とします。
S3 のキーは以下で定義します。
s3_bucket
バケット名s3_object_key_format
カスタマイズしたキー名 (%{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension}
)
%{path}と %{time_slice_format} はそれぞれのパラメーターで定義したものに置き換えられます。
最終的には /mqtt/foo/bar/201511221156_HOSTNAME_0.txt というようなキー名になります。
ロードバランスされた複数の MQTT サーバーから S3 にメッセージ格納するため、S3 キーが被らないように、キー名にホスト名情報(_HOSTNAME_
の箇所。適宜変更してください) を含めています。
実際に保存されたS3キーが以下です。
$ aws s3 ls s3://YOUR_BUCKET_NAME/mqtt/foo/bar/ 2015-11-22 12:07:17 49 201511221156_HOSTNAME_0.txt 2015-11-22 12:12:17 55 201511221201_HOSTNAME_0.txt 2015-11-22 12:13:17 94 201511221202_HOSTNAME_0.txt 2015-11-22 12:15:17 103 201511221204_HOSTNAME_0.txt 2015-11-22 12:16:17 55 201511221205_HOSTNAME_0.txt ...
パラメーター buffer_path
ではバッファリングに利用するファイルパスを指定します。
fluentd のバッファリング機能は、logstash に対する大きなメリットのため、特別な理由がない限り活用しましょう。
buffer_path /var/log/td-agent/s3_foo.bar
のように定義すると /var/log/td-agent/s3_foo.bar.201511221525.b525253b0998677a0.log
というようにバッファリングファイルが作成されます。
fluentd の設定を有効にする
ファイルの設定変更を有効にします。
$ sudo service td-agent restart Retarting td-agent: [ OK ]
メッセージ送信&保存テスト
MQTT ブローカーにメッセージ送信
$ mosquitto_pub -d -t foo/baz -m a $ mosquitto_pub -d -t foo/bar -m 1
ローカルサーバーのバッファリングファイルを確認
$ cat /var/log/td-agent/s3_foo.bar.201511221538.b5252568fc7610d66.log 2015-11-22T15:38:08Z foo.bar {"message":"1"}
送信メッセージが含まれています。
S3 に PUT されたオブジェクトを確認
しばらく様子見して、S3 に反映されていることを確認します。
$ aws s3 ls s3://YOUR_BUCKET_NAME/mqtt/foo/bar/ ... 2015-11-22 15:48:09 456 201511221537_HOSTNAME_0.txt 2015-11-22 15:49:09 362 201511221538_HOSTNAME_0.txt 2015-11-22 15:50:09 408 201511221539_HOSTNAME_0.txt $ aws s3 cp s3://YOUR_BUCKET_NAME/mqtt/foo/bar/201511221538_HOSTNAME_0.txt - 2015-11-22T15:38:08Z foo.bar {"message":"1"} 2015-11-22T15:38:12Z foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} 2015-11-22T15:38:31Z foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} 2015-11-22T15:38:46Z foo.bar {"message":"aaaaaaaaaaa"} 2015-11-22T15:38:57Z foo.bar {"message":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
期待通り S3 に保存されています。
forest プラグインを使って match ルールをシンプルに書く
今回は対象のトピックが 2 つだけだったため、ほぼ同じ内容を2箇所にコピペして書きました。 トピックが5, 10 となると、設定の管理が手間ですね。
プラグイン forest を使うと、タグを動的に解釈させてすっきりと書けます。
先ほどの設定を forest を使って書き直してみましょう。
forest プラグインのインストール
まずは forest プラグインをインストールします。
$ sudo td-agent-gem install fluent-plugin-forest WARN: Unresolved specs during Gem::Specification.reset: json (>= 1.4.3) WARN: Clearing out unresolved specs. Please report a bug if this causes problems. Fetching: fluent-plugin-forest-0.3.0.gem (100%) Successfully installed fluent-plugin-forest-0.3.0 Parsing documentation for fluent-plugin-forest-0.3.0 Installing ri documentation for fluent-plugin-forest-0.3.0 Done installing documentation for fluent-plugin-forest after 0 seconds 1 gem installed
アウトプットの定義
forest プラグインを使ってアウトプットの match ディレクティブを書き直したのが以下です。
<match foo.**> type forest subtype s3 <template> s3_bucket YOUR_BUCKET_NAME path mqtt/foo/${tag_parts[1]} s3_object_key_format %{path}/%{time_slice}_HOSTNAME_%{index}.%{file_extension} buffer_path /var/log/td-agent/s3_${tag} time_slice_format %Y%m%d%H%M time_slice_wait 10m utc store_as text </template> </match>
肝は template
ディレクティブです。
ハードコードされていたタグ名を変数で置き換えています。
- path
mqtt/foo/bar
はmqtt/foo/${tag_parts[1]}
- buffer_path
/var/log/td-agent/s3_foo.bar
は/var/log/td-agent/s3_${tag}
という具体です。
このようにしておくと、タグ(トピック)のバリエーションが増えても、fluentd の設定を変更することなく、S3 に保存されます。
まとめ
今回はトピックをキーに S3 保存先を切り替える設定例を紹介しました。 MQTT のトピックと fluentd のタグは非常に相性が良いことを感じていただけましたでしょうか?